Will
10/29/2021, 3:10 PMSahil Chopra
10/29/2021, 5:25 PMmonitor_scrape
task that is monitoring the status of a long running job. At the completion of this task, I want to shoot out an email that provides stats outputted from that task. I’m passing the output of the monitor_scrape
task into my email task as the message; but the email seems to be firing independently (see diagram). Any pointers on what I might be doing wrong?
Flow Code:
completion_email_task = EmailTask(email_to='xxx', email_to_cc='xxx')
@task()
def monitor_scraper_run_status() -> Output:
# task code here
def create_flow(flow_name: str) -> Flow:
with Flow(flow_name,
state_handlers=[gmail_notifier(only_states=[Running, Cancelled, Looped, Failed])
]) as flow:
output = monitor_scraper_run_status()
completion_email_task(subject="<SUBJECT_GOES_HERE>", msg=output.__repr__())
return flow
Payam K
10/29/2021, 8:00 PMJacob Bedard
10/29/2021, 11:02 PMUnexpected error: TypeError("cannot pickle '_thread.lock' object")
Can someone let me know what this is if they've had this same issue?
My flow runs ok from the machine where I have the agent, but I can't get it to run when I attempt scheduling or even doing an ad-hoc run from the prefect cloud UIhaf
10/30/2021, 4:07 PMA -> B[0..N] -> C[0..N]
Sometimes some index N fails and then C never runs, because at least one of the upstream Mapped tasks failed.
However, I'd always like to run C for the indices that worked; and the flow itself I want to be marked successful despite a few mappings failing.
How could I configure this?William Grim
10/30/2021, 7:26 PMprefect agent local start -l mylabel --show-flow-logs 2>&1 | tee /my/file/here.log
? I've got a very long-running flow, and I don't see any output. Is prefect buffering? Do I need to use stdbuf
when running the process?Mike Lev
10/30/2021, 7:44 PMmr Memedi
10/30/2021, 10:13 PMFina Silva-Santisteban
10/31/2021, 2:37 PMEric Feldman
10/31/2021, 2:57 PMpd.DataFrame
to a task, but it failed when querying GraphQL since pd.DataFrame
isn’t json serializable
~/Library/Caches/pypoetry/virtualenvs/recipe-t7nMXg6Y-py3.8/lib/python3.8/site-packages/prefect/client/client.py in graphql(self, query, raise_on_error, headers, variables, token, retry_on_api_error)
550 server=self.api_server,
551 headers=headers,
--> 552 params=dict(query=parse_graphql(query), variables=json.dumps(variables)),
553 token=token,
554 retry_on_api_error=retry_on_api_error,
I can pass df.dict()
and deserialize it, but I rather notTodd de Quincey
10/31/2021, 5:39 PMHarish
10/31/2021, 10:16 PMDaniel Siles
11/01/2021, 9:15 AM@task()
def transform_url(url):
try:
if '?' in url:
return url[:url.find('?')]
except Exception:
pass
return url
urls = #list with 1k urls
with Flow("flow") as flow:
transformed_urls = transform_url.map(urls)
import time
start = time.time()
flow.run()
print(time.time() - start)
Execution time: 31.85s
Is there a way to reduce the overhead of a task?Marko Herkaliuk
11/01/2021, 9:27 AMEric Feldman
11/01/2021, 12:23 PMAdam Brusselback
11/01/2021, 2:18 PMJean-Michel Provencher
11/01/2021, 3:00 PMJoe
11/01/2021, 3:40 PMprefect server config
. I tried to create a project in the UI, but I get an "uh-oh" error - looking at the GraphQL response I can see the error is "Variable "$tenantId" of non-null type "UUID!" must not be null.". I think a tenant is a team, and if I try to update the team name from the profile page I see a similar GraphQL error: "Field update_tenant_name_input.tenant_id of required type UUID! was not provided". If I try to create a project using the CLI (prefect create project "Test Project"
), I get the error: "You have not set an API key for authentication" - I can't see a way to setup an API key that doesn't seem to related to Prefect Cloud? If I try to create a tenant with the CLI (prefect server create-tenant --name="Test"
) I get: "To create a tenant with Prefect Cloud, please signup at ...". Have I somehow missed creating the 'default' team - is there a way I can fix that?Payam K
11/01/2021, 3:42 PMJoe Still
11/01/2021, 5:31 PMVamsi Reddy
11/01/2021, 6:01 PMwith Flow(state_handler=[post_to_Slack]):
or
@task(log_stdout=True, max_retries=3, retry_delay=datetime.timedelta(minutes=5),state_handler=[post_to_slack])
def some_task():
return something
Andrey Kozhevnikov
11/01/2021, 6:20 PMGuozheng Kuang
11/01/2021, 6:23 PMGuozheng Kuang
11/01/2021, 6:23 PMVamsi Reddy
11/01/2021, 6:37 PMKevin Kho
11/01/2021, 9:30 PMPhil
11/01/2021, 10:01 PMTypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
If I got it right, Task D only gets a reference to the previous task object instead of the reduced list of DataFrames. The same logic with the task decorator, on the other hand, works without any problems. There is no difference in both logics, which is why I am quite irritated by the different results. Please, can someone help me out? It's probably just a small thing, but I just can't find it 😕Donny Flynn
11/01/2021, 10:25 PMwith Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
Task_with_secret_pulling()
The task failed with this error message: `Error during execution of task: ValueError("Could not infer an active Flow context while creating edge to <Task: LessThan>. This often means you called a task outside a with Flow(...)
block. If you're trying to run this task outside of a Flow context, you need to call `LessThan(...).run(...)`")`.
Then I brought all secrets to the flow level which created them as separate tasks, but I am curious: is it the Prefect intention to have each "get" of a Secret be a separate task?Todd de Quincey
11/02/2021, 6:36 AMCharles Hunt
11/02/2021, 9:04 AMCharles Hunt
11/02/2021, 9:04 AMAnna Geller
11/02/2021, 9:44 AMCharles Hunt
11/02/2021, 10:49 AM#call the simpleingest endpoint
response = <http://requests.post|requests.post>(apiendpointurl, headers=headers, data = requestdata, timeout=14400)
except:
logger.error("General runtime error during API call.")
raise RuntimeError("General runtime error during API call.")
Task 'ingest_task': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
httplib_response = conn.getresponse()
File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
response.begin()
File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
version, status, reason = self._read_status()
File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
return self._sock.recv_into(b)
ConnectionResetError: [Errno 104] Connection reset by peer
# prefect libraries for accessing prefect runtime api
import prefect
from prefect import task, Flow
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
from prefect.client import Secret
# psycp[g2 libraries for accessing PostgreSQL
import psycopg2
from psycopg2 import connect, sql
import psycopg2.extras
from psycopg2.extras import RealDictCursor
# used for making API calls
import requests
# used for date functions
from datetime import datetime, timedelta
# have already run `prefect backend cloud` CLI command on localhost to set orchestration backend for Prefect Cloud
# set flow config to log to cloud server
prefect.config.cloud.send_flow_run_logs = True
@task(log_stdout=True)
def ingest_task():
logger = prefect.context.get("logger")
apiendpointurl = '<http://10.10.10.10:5000/SimpleIngest>'
headers = {
'Content-type':'application/json',
'api-key':'secretkey'
}
# get db config
dbconfig = Secret("fsdb").get()
# set up default hwm date and days
hwmdate = (datetime.today() - timedelta(days = 1)).strftime('%Y/%m/%d')
days = 0
# get db hwm
with connect(f"dbname={dbconfig['dbname']} host={dbconfig['host']} user={dbconfig['user']} password={dbconfig['password']} sslmode=require", cursor_factory=RealDictCursor) as pgcon:
with pgcon.cursor() as pgcursor:
# run the sql
pgcursor.execute("SELECT max(next_max_hwm) as hwm FROM staging.high_water_marks where source_table = 'AAAAA_BBBBBB';")
if pgcursor.rowcount > 0 :
# extract the result
pgrecord = pgcursor.fetchone()
<http://logger.info|logger.info>(f"Control: High water mark date: {pgrecord['hwm']}")
hwm = datetime.date(pgrecord['hwm'])
hwmdate = hwm.strftime('%Y/%m/%d')
days = (datetime.date(datetime.today()) - hwm).days
<http://logger.info|logger.info>(f"Number of days delta: {days}")
if days > 0 :
requestdata = f"{{'JobName': 'delta_prod','DataJobId': 105, 'JobPathId': 'YYYYYYYY', 'DataJobType': 'Bounces', 'StartDate': '{hwmdate}', 'NumberOfDays': {days}, 'Identifiers': '1D3E4DAC-00000-0000-0000-10BD9A9869BC,ZZZ'}}"
# log the endpoint
<http://logger.info|logger.info>(apiendpointurl)
# log the inputs
<http://logger.info|logger.info>(requestdata)
try:
#call the simpleingest endpoint
response = <http://requests.post|requests.post>(apiendpointurl, headers=headers, data = requestdata, timeout=14400)
if response.text.__contains__("completedSuccess") == False:
logger.error(response.json())
raise RuntimeError('SmartDataLoader API did not complete successfully')
except RuntimeError as exc:
logger.error("API runtime failure.")
raise RuntimeError from exc
except TimeoutError as exc:
logger.error("API Timeout")
raise TimeoutError from exc
except:
logger.error("General runtime error during API call.")
raise RuntimeError("General runtime error during API call.")
else:
<http://logger.info|logger.info>(response.json())
else:
<http://logger.info|logger.info>("Up to date with delta")
with Flow("git-prod-bronto-bounces-flow") as flow:
ingest_task()
flow.storage = GitHub(
repo="XXXXXXX",
path="YYYYYYYY/flows/production/bronto/bronto_bounces_delta_flow.py",
access_token_secret="GITHUB_ACCESS_TOKEN"
)
flow.run_config = KubernetesRun(labels=["prod"],env={"EXTRA_PIP_PACKAGES": "psycopg2-binary requests"})
flow.register(project_name="YYYYYYYY")
03:26:13
ERROR
ingest_task
General runtime error during API call.
03:26:13
ERROR
CloudTaskRunner
Task 'ingest_task': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
httplib_response = conn.getresponse()
File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
response.begin()
File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
version, status, reason = self._read_status()
File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
return self._sock.recv_into(b)
ConnectionResetError: [Errno 104] Connection reset by peer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
timeout=timeout
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 756, in urlopen
method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
File "/usr/local/lib/python3.7/site-packages/urllib3/util/retry.py", line 532, in increment
raise six.reraise(type(error), error, _stacktrace)
File "/usr/local/lib/python3.7/site-packages/urllib3/packages/six.py", line 769, in reraise
raise value.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
httplib_response = conn.getresponse()
File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
response.begin()
File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
version, status, reason = self._read_status()
File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
return self._sock.recv_into(b)
urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<string>", line 64, in ingest_task
File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 119, in post
return request('post', url, data=data, json=json, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 61, in request
return session.request(method=method, url=url, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 542, in request
resp = self.send(prep, **send_kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 655, in send
r = adapter.send(request, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 498, in send
raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
logger=self.logger,
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "<string>", line 78, in ingest_task
RuntimeError: General runtime error during API call.
Anna Geller
11/02/2021, 11:28 AMCharles Hunt
11/02/2021, 11:37 AMAnna Geller
11/02/2021, 11:41 AMCharles Hunt
11/02/2021, 1:04 PMAnna Geller
11/02/2021, 1:07 PMprefect agent local start
But you may want to set up a Supervisord to make it run as a background process:
• Docs: https://docs.prefect.io/orchestration/agents/local.html#flow-configuration
• Supervisor: https://docs.prefect.io/orchestration/agents/local.html#using-with-supervisorCharles Hunt
11/05/2021, 12:00 PMimport socket
from urllib3.connection import HTTPConnection
TCP_KEEPALIVE = 0x10
HTTPConnection.default_socket_options = HTTPConnection.default_socket_options + [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
(socket.IPPROTO_TCP, TCP_KEEPALIVE, 60),
]
Anna Geller
11/05/2021, 12:10 PMCharles Hunt
11/05/2021, 12:10 PMAnna Geller
11/05/2021, 12:10 PMapiendpointurl = '<http://10.10.10.10:5000/SimpleIngest>'
Charles Hunt
11/05/2021, 12:11 PMapiendpointurl = '<http://localhost:5000/SimpleIngest>'
Anna Geller
11/05/2021, 12:15 PMCharles Hunt
11/05/2021, 12:26 PMAnna Geller
11/05/2021, 12:32 PM